{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ec9c4962-e6d5-4029-9913-52dfd34eefd2",
   "metadata": {},
   "outputs": [],
   "source": [
    "%session_id_prefix native-delta-dataframe-\n",
    "%glue_version 3.0\n",
    "%idle_timeout 60\n",
    "%%configure\n",
    "{\n",
    "  \"--conf\": \"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n",
    "  \"--extra-py-files\": \"/opt/aws_glue_connectors/selected/delta-core_2.12-1.0.0.jar\",\n",
    "  \"--datalake-formats\": \"delta\"\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "74b7200d-2175-4f2f-b7ff-569ce57fd192",
   "metadata": {},
   "outputs": [],
   "source": [
    "bucket_name = \"<Your S3 bucket name>\"\n",
    "bucket_prefix = \"<Your S3 bucket prefix>\"\n",
    "database_name = \"delta_dataframe\"\n",
    "database_prefix = f\"{bucket_prefix}/{database_name}\"\n",
    "database_location = f\"s3://{bucket_name}/{database_prefix}/\"\n",
    "table_name = \"products\"\n",
    "table_prefix = f\"{database_prefix}/{table_name}\"\n",
    "table_location = f\"s3://{bucket_name}/{table_prefix}/\""
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7e7eab99-9d36-4b5b-8eb0-d7b935351750",
   "metadata": {},
   "source": [
    "## Clean up existing resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "62dc44d1-4c48-4f24-bfce-60637972914b",
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "\n",
    "## Delete files in S3\n",
    "s3 = boto3.resource('s3')\n",
    "bucket = s3.Bucket(bucket_name)\n",
    "bucket.objects.filter(Prefix=f\"{table_prefix}/\").delete()\n",
    "\n",
    "## Drop tables in Glue Data Catalog\n",
    "try:\n",
    "    glue = boto3.client('glue')\n",
    "    glue.delete_table(DatabaseName=database_name, Name=table_name)\n",
    "except glue.exceptions.EntityNotFoundException:\n",
    "    print(f\"Table {database_name}.{table_name} does not exist\")\n",
    "try:\n",
    "    glue = boto3.client('glue')\n",
    "    glue.delete_table(DatabaseName=database_name, Name='testTable')\n",
    "except glue.exceptions.EntityNotFoundException:\n",
    "    print(f\"Table {database_name}.testTable does not exist\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "08706080-9af8-4721-bdfa-0872e0407c68",
   "metadata": {},
   "source": [
    "## Create Delta table with sample data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0ba30946",
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    glue = boto3.client('glue')\n",
    "    res = glue.get_database(Name=database_name)\n",
    "    print(f\"Database {database_name} exists.\")\n",
    "    if 'LocationUri' not in res['Database']:\n",
    "        print(f\"Warning: Database {database_name} does not have Location. You need to configure location in the database.\")\n",
    "except glue.exceptions.EntityNotFoundException:\n",
    "    print(f\"Database {database_name} does not exist.\")\n",
    "    glue = glue.create_database(\n",
    "        DatabaseInput={\n",
    "            'Name': database_name,\n",
    "            'LocationUri': database_location\n",
    "        }\n",
    "    )\n",
    "    print(f\"Created a new database {database_name}.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1d241e37-0ab5-4e1d-9ec1-fd428bc865e8",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "import time\n",
    "\n",
    "ut = time.time()\n",
    "\n",
    "product = [\n",
    "    {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n",
    "]\n",
    "\n",
    "df_products = spark.createDataFrame(Row(**x) for x in product)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9e4eca5d-b71a-43f4-963a-7841fff73c8a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create table in the metastore using DataFrame's schema and write data to it\n",
    "df_products.write.format(\"delta\").mode(\"overwrite\").option(\"path\",table_location).saveAsTable(f\"{database_name}.{table_name}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a0a38c88",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create a Deltatable with path (not in metastore) using DataFrame's schema and write/overwrite data to it\n",
    "df_products.write.format(\"delta\").mode(\"overwrite\").save(table_location)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d46e5251",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create a Deltatable using the DeltaTableBuilder API using a dataframe's schema\n",
    "from delta.tables import DeltaTable\n",
    "deltaTable = DeltaTable.create(spark).tableName(f\"{database_name}.testTable\").addColumns(df_products.schema).location(table_location+\"_testTable\").execute()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fda19a10-4b69-4272-99a2-1b1156e937c2",
   "metadata": {},
   "source": [
    "## Read from Delta table via DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "14a6f45f-1cf6-4f6f-9e63-4c89d6ce2cbd",
   "metadata": {},
   "outputs": [],
   "source": [
    "# query table in the metastore\n",
    "df_products_read = spark.table(f\"{database_name}.{table_name}\")\n",
    "df_products_read.show()\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d142f31d",
   "metadata": {},
   "outputs": [],
   "source": [
    "# query table by path\n",
    "df_products_read = spark.read. \\\n",
    "    format(\"delta\"). \\\n",
    "    load(table_location)\n",
    "df_products_read.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "86dc4db0-ce1d-40d7-bbc8-b32e0e769d43",
   "metadata": {},
   "source": [
    "## Read from Delta table via DeltaLake library"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7057cb91-d80e-4807-a3fd-faf4575adba1",
   "metadata": {},
   "outputs": [],
   "source": [
    "from delta.tables import *\n",
    "\n",
    "deltaTable = DeltaTable.forPath(spark, table_location) #query table from path\n",
    "deltaTable.toDF().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ab642097",
   "metadata": {},
   "outputs": [],
   "source": [
    "deltaTable = DeltaTable.forName(spark,f\"{database_name}.{table_name}\") #query table from metastore\n",
    "deltaTable.toDF().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "59bddfcf",
   "metadata": {},
   "outputs": [],
   "source": [
    "deltaTabletest = DeltaTable.forName(spark,f\"{database_name}.testTable\").toDF().show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "28f357f7",
   "metadata": {},
   "source": [
    "## Modify schema/properties of DeltaLake table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ffa9b669",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import lit\n",
    "\n",
    "df_products.withColumn(\"Currency\",lit(\"USD\")).write \\\n",
    "    .format(\"delta\") \\\n",
    "    .mode(\"overwrite\") \\\n",
    "    .option(\"overwriteSchema\", \"true\") \\\n",
    "    .option(\"path\", table_location) \\\n",
    "    .saveAsTable(f\"{database_name}.{table_name}\") # External table"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0b80b1cb",
   "metadata": {},
   "source": [
    "## Insert records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8550f5ff",
   "metadata": {},
   "outputs": [],
   "source": [
    "ut = time.time()\n",
    "product_insert = [\n",
    "    {'product_id': '00006', 'product_name': 'Book', 'price': 500, 'category': 'Stationery','Currency': 'INR', 'updated_at': ut}, # Insert\n",
    "    {'product_id': '00007', 'product_name': 'Pen', 'price': 50, 'category': 'Stationery','Currency': 'USD', 'updated_at': ut} # Insert\n",
    "]\n",
    "df_product_insert = spark.createDataFrame(Row(**x) for x in product_insert)\n",
    "\n",
    "df_product_insert.write.format(\"delta\").mode(\"append\").saveAsTable(f\"{database_name}.{table_name}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9cc12691",
   "metadata": {},
   "source": [
    "## Update records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "50483876",
   "metadata": {},
   "outputs": [],
   "source": [
    "from delta.tables import *\n",
    "from pyspark.sql.functions import *\n",
    "\n",
    "deltaTable = DeltaTable.forName(spark, f\"{database_name}.{table_name}\")\n",
    "\n",
    "# Declare the predicate by using a SQL-formatted string.\n",
    "deltaTable.update(\n",
    "    condition = \"product_id = '00006'\",\n",
    "    set = { \"Currency\": \"'USD'\" }\n",
    ")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "011c5d64",
   "metadata": {},
   "source": [
    "## Delete records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7e2f08a8",
   "metadata": {},
   "outputs": [],
   "source": [
    "# delete product_id 00007\n",
    "\n",
    "deltaTable = DeltaTable.forName(spark, f\"{database_name}.{table_name}\")\n",
    "deltaTable.delete(\"product_id = '00007'\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c013d49c-da63-4910-b423-4ebd0e346e1f",
   "metadata": {},
   "source": [
    "## Upsert records into Delta table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "97c52ecd-ac33-4178-b41d-ede0db0b1c97",
   "metadata": {},
   "outputs": [],
   "source": [
    "ut = time.time()\n",
    "\n",
    "product_updates = [\n",
    "    {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'Currency': 'INR','updated_at': ut}, # Update\n",
    "    {'product_id': '00007', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'Currency': 'INR','updated_at': ut} # Insert\n",
    "]\n",
    "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)\n",
    "\n",
    "deltaTable.alias(\"products\").merge(\n",
    "    df_product_updates.alias(\"updates\"),\n",
    "    \"products.product_id = updates.product_id\") \\\n",
    "    .whenMatchedUpdate(set = {\n",
    "        \"product_name\": \"updates.product_name\",\n",
    "        \"price\": \"updates.price\",\n",
    "        \"category\": \"updates.category\",\n",
    "        \"updated_at\": \"updates.updated_at\",\n",
    "        \"Currency\": \"updates.Currency\"\n",
    "    } ) \\\n",
    "    .whenNotMatchedInsert(values = {\n",
    "        \"product_id\": \"updates.product_id\",\n",
    "        \"product_name\": \"updates.product_name\",\n",
    "        \"price\": \"updates.price\",\n",
    "        \"category\": \"updates.category\",\n",
    "        \"updated_at\": \"updates.updated_at\",\n",
    "        \"Currency\": \"updates.Currency\"\n",
    "    }\n",
    ") \\\n",
    ".execute()\n",
    "deltaTable.toDF().show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "885a8de2",
   "metadata": {},
   "source": [
    "## View History"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a646a330",
   "metadata": {},
   "outputs": [],
   "source": [
    "from delta.tables import *\n",
    "\n",
    "deltaTable = DeltaTable.forPath(spark, table_location)\n",
    "\n",
    "fullHistoryDF = deltaTable.history() \n",
    "fullHistoryDF.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7ac51f2b",
   "metadata": {},
   "source": [
    "## Query with time travel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1b5861f9",
   "metadata": {},
   "outputs": [],
   "source": [
    "df1 = spark.read.format(\"delta\").option(\"timestampAsOf\", \"2022-04-28 12:44:07\").load(table_location) #timestamp in YYYY-MM-DD HH:MM:SS\n",
    "df2 = spark.read.format(\"delta\").option(\"versionAsOf\", 3).load(table_location)\n",
    "df1.show()\n",
    "df2.show()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4a9a0ae7",
   "metadata": {},
   "source": [
    "# Roll Back"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "422e870d",
   "metadata": {},
   "outputs": [],
   "source": [
    "#Restore to version 2 using dataframe\n",
    "\n",
    "spark.read.format(\"delta\") \\\n",
    "    .option(\"versionAsOf\", 2) \\\n",
    "    .load(table_location) \\\n",
    "    .write \\\n",
    "    .mode(\"overwrite\") \\\n",
    "    .format(\"delta\") \\\n",
    "    .save(table_location)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "dee36fbb",
   "metadata": {},
   "source": [
    "## Stop Session"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e4fd69d3",
   "metadata": {},
   "outputs": [],
   "source": [
    "%stop_session"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Glue PySpark",
   "language": "python",
   "name": "glue_pyspark"
  },
  "toc-autonumbering": true,
  "toc-showcode": true,
  "toc-showmarkdowntxt": true,
  "toc-showtags": true
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
